In [ ]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('2.1. Google Cloud Storage (CSV) & Spark DataFrames') \
.getOrCreate()
In [2]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
List files in a Google Cloud Storage bucket using the google-cloud-storage python library which comes installed on Dataproc clusters. We will be using a publicly available dataset.
In [3]:
from google.cloud import storage
gcs_client = storage.Client()
bucket = gcs_client.bucket('solutions-public-assets')
list(bucket.list_blobs(prefix='time-series-master/'))
Out[3]:
Alternatively use the hdfs cmd to list files in a directory which supports GCS buckets
In [4]:
!hdfs dfs -ls 'gs://solutions-public-assets/time-series-master'
In [5]:
df1 = spark \
.read \
.option ( "inferSchema" , "true" ) \
.option ( "header" , "true" ) \
.csv ( "gs://solutions-public-assets/time-series-master/GBPUSD_*.csv" )
df1.printSchema()
In [17]:
df1
Out[17]:
If there is no header with column names as we can see with the dataset here or the schema is not infered correctly then read CSV files from GCS and define schema
In [6]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, DateType
schema = StructType([
StructField("venue", StringType()),
StructField("currencies", StringType()),
StructField("time_stamp", TimestampType()),
StructField("bid", DoubleType()),
StructField("ask", DoubleType())
])
df2 = spark \
.read \
.schema(schema) \
.csv ( "gs://solutions-public-assets/time-series-master/GBPUSD_*.csv" )
df2.printSchema()
View the top 20 rows of the spark dataframe
In [7]:
df2
Out[7]:
Print the shape of the dataframe. No of rows and no of columns
In [8]:
print((df2.count(), len(df2.columns)))
Add hour column and filter the data to create a new dataframe with only 1 day of data
In [9]:
import pyspark.sql.functions as F
df3 = df2.withColumn("hour", F.hour(F.col("time_stamp"))) \
.filter(df2['time_stamp'] >= F.lit('2014-01-01 00:00:00')) \
.filter(df2['time_stamp'] < F.lit('2014-01-02 00:00:10')).cache()
df3
Out[9]:
In [10]:
print((df3.count(), len(df3.columns)))
Group by hour and order by top_bids
In [11]:
import pyspark.sql.functions as F
df4 = df3 \
.groupBy("hour") \
.agg(F.sum('bid').alias('total_bids'))
df4.orderBy('total_bids', ascending=False)
Out[11]:
In [12]:
# Update to your GCS bucket
gcs_bucket = 'dataproc-bucket-name'
gcs_filepath = 'gs://{}/currency/hourly_bids.csv'.format(gcs_bucket)
df4.coalesce(1).write \
.mode('overwrite') \
.csv(gcs_filepath)
Read the CSV file into new DataFrame to check it was successfuly saved
In [13]:
!hdfs dfs -ls gs://dataproc-bucket-name/currency
In [14]:
df5 = spark.read \
.option ( "inferSchema" , "true" ) \
.option ( "header" , "true" ) \
.csv('gs://dataproc-bucket-name/currency/*')
df5
Out[14]: